/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package com.feilong.lib.org.apache.http.pool;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.feilong.lib.org.apache.http.annotation.Contract;
import com.feilong.lib.org.apache.http.annotation.ThreadingBehavior;
import com.feilong.lib.org.apache.http.concurrent.FutureCallback;
import com.feilong.lib.org.apache.http.util.Args;
import com.feilong.lib.org.apache.http.util.Asserts;

/**
 * Abstract synchronous (blocking) pool of connections.
 * <p>
 * Please note that this class does not maintain its own pool of execution {@link Thread}s.
 * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
 * method on the {@link Future} object returned by the
 * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
 * to complete.
 *
 * @param <T>
 *            the route type that represents the opposite endpoint of a pooled
 *            connection.
 * @param <C>
 *            the connection type.
 * @param <E>
 *            the type of the pool entry containing a pooled connection.
 * @since 4.2
 */
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>> implements ConnPool<T, E>,ConnPoolControl<T>{

    private final Lock                               lock;

    private final Condition                          condition;

    private final ConnFactory<T, C>                  connFactory;

    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;

    private final Set<E>                             leased;

    private final LinkedList<E>                      available;

    private final LinkedList<Future<E>>              pending;

    private final Map<T, Integer>                    maxPerRoute;

    private volatile boolean                         isShutDown;

    private volatile int                             defaultMaxPerRoute;

    private volatile int                             maxTotal;

    private volatile int                             validateAfterInactivity;

    public AbstractConnPool(final ConnFactory<T, C> connFactory, final int defaultMaxPerRoute, final int maxTotal){
        super();
        this.connFactory = Args.notNull(connFactory, "Connection factory");
        this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
        this.maxTotal = Args.positive(maxTotal, "Max total value");
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.routeToPool = new HashMap<>();
        this.leased = new HashSet<>();
        this.available = new LinkedList<>();
        this.pending = new LinkedList<>();
        this.maxPerRoute = new HashMap<>();
    }

    /**
     * Creates a new entry for the given connection with the given route.
     */
    protected abstract E createEntry(T route,C conn);

    /**
     * @since 4.3
     */
    protected void onLease(final E entry){
    }

    /**
     * @since 4.3
     */
    protected void onRelease(final E entry){
    }

    /**
     * @since 4.4
     */
    protected void onReuse(final E entry){
    }

    /**
     * @since 4.4
     */
    protected boolean validate(final E entry){
        return true;
    }

    public boolean isShutdown(){
        return this.isShutDown;
    }

    /**
     * Shuts down the pool.
     */
    public void shutdown() throws IOException{
        if (this.isShutDown){
            return;
        }
        this.isShutDown = true;
        this.lock.lock();
        try{
            for (final E entry : this.available){
                entry.close();
            }
            for (final E entry : this.leased){
                entry.close();
            }
            for (final RouteSpecificPool<T, C, E> pool : this.routeToPool.values()){
                pool.shutdown();
            }
            this.routeToPool.clear();
            this.leased.clear();
            this.available.clear();
        }finally{
            this.lock.unlock();
        }
    }

    private RouteSpecificPool<T, C, E> getPool(final T route){
        RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
        if (pool == null){
            pool = new RouteSpecificPool<T, C, E>(route){

                @Override
                protected E createEntry(final C conn){
                    return AbstractConnPool.this.createEntry(route, conn);
                }

            };
            this.routeToPool.put(route, pool);
        }
        return pool;
    }

    private static Exception operationAborted(){
        return new CancellationException("Operation aborted");
    }

    /**
     * {@inheritDoc}
     * <p>
     * Please note that this class does not maintain its own pool of execution
     * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
     * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
     * returned by this method in order for the lease operation to complete.
     */
    @Override
    public Future<E> lease(final T route,final Object state,final FutureCallback<E> callback){
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");

        return new Future<E>(){

            private final AtomicBoolean      cancelled = new AtomicBoolean(false);

            private final AtomicBoolean      done      = new AtomicBoolean(false);

            private final AtomicReference<E> entryRef  = new AtomicReference<>(null);

            @Override
            public boolean cancel(final boolean mayInterruptIfRunning){
                if (done.compareAndSet(false, true)){
                    cancelled.set(true);
                    lock.lock();
                    try{
                        condition.signalAll();
                    }finally{
                        lock.unlock();
                    }
                    if (callback != null){
                        callback.cancelled();
                    }
                    return true;
                }
                return false;
            }

            @Override
            public boolean isCancelled(){
                return cancelled.get();
            }

            @Override
            public boolean isDone(){
                return done.get();
            }

            @Override
            public E get() throws InterruptedException,ExecutionException{
                try{
                    return get(0L, TimeUnit.MILLISECONDS);
                }catch (final TimeoutException ex){
                    throw new ExecutionException(ex);
                }
            }

            @Override
            public E get(final long timeout,final TimeUnit timeUnit) throws InterruptedException,ExecutionException,TimeoutException{
                for (;;){
                    synchronized (this){
                        try{
                            final E entry = entryRef.get();
                            if (entry != null){
                                return entry;
                            }
                            if (done.get()){
                                throw new ExecutionException(operationAborted());
                            }
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                            if (validateAfterInactivity > 0){
                                if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()){
                                    if (!validate(leasedEntry)){
                                        leasedEntry.close();
                                        release(leasedEntry, false);
                                        continue;
                                    }
                                }
                            }
                            if (done.compareAndSet(false, true)){
                                entryRef.set(leasedEntry);
                                done.set(true);
                                onLease(leasedEntry);
                                if (callback != null){
                                    callback.completed(leasedEntry);
                                }
                                return leasedEntry;
                            }
                            release(leasedEntry, true);
                            throw new ExecutionException(operationAborted());
                        }catch (final IOException ex){
                            if (done.compareAndSet(false, true)){
                                if (callback != null){
                                    callback.failed(ex);
                                }
                            }
                            throw new ExecutionException(ex);
                        }
                    }
                }
            }

        };
    }

    /**
     * Attempts to lease a connection for the given route and with the given
     * state from the pool.
     * <p>
     * Please note that this class does not maintain its own pool of execution
     * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
     * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
     * returned by this method in order for the lease operation to complete.
     *
     * @param route
     *            route of the connection.
     * @param state
     *            arbitrary object that represents a particular state
     *            (usually a security principal or a unique token identifying
     *            the user whose credentials have been used while establishing the connection).
     *            May be {@code null}.
     * @return future for a leased pool entry.
     */
    public Future<E> lease(final T route,final Object state){
        return lease(route, state, null);
    }

    private E getPoolEntryBlocking(final T route,final Object state,final long timeout,final TimeUnit timeUnit,final Future<E> future)
                    throws IOException,InterruptedException,ExecutionException,TimeoutException{

        Date deadline = null;
        if (timeout > 0){
            deadline = new Date(System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        this.lock.lock();
        try{
            E entry;
            for (;;){
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                if (future.isCancelled()){
                    throw new ExecutionException(operationAborted());
                }
                final RouteSpecificPool<T, C, E> pool = getPool(route);
                for (;;){
                    entry = pool.getFree(state);
                    if (entry == null){
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())){
                        entry.close();
                    }
                    if (entry.isClosed()){
                        this.available.remove(entry);
                        pool.free(entry, false);
                    }else{
                        break;
                    }
                }
                if (entry != null){
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0){
                    for (int i = 0; i < excess; i++){
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null){
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute){
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0){
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1){
                            final E lastUsed = this.available.removeLast();
                            lastUsed.close();
                            final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                            otherpool.remove(lastUsed);
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

                boolean success = false;
                try{
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null){
                        success = this.condition.awaitUntil(deadline);
                    }else{
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()){
                        throw new ExecutionException(operationAborted());
                    }
                }finally{
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())){
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public void release(final E entry,final boolean reusable){
        this.lock.lock();
        try{
            if (this.leased.remove(entry)){
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown){
                    this.available.addFirst(entry);
                }else{
                    entry.close();
                }
                onRelease(entry);
                Future<E> future = pool.nextPending();
                if (future != null){
                    this.pending.remove(future);
                }else{
                    future = this.pending.poll();
                }
                if (future != null){
                    this.condition.signalAll();
                }
            }
        }finally{
            this.lock.unlock();
        }
    }

    private int getMax(final T route){
        final Integer v = this.maxPerRoute.get(route);
        return v != null ? v : this.defaultMaxPerRoute;
    }

    @Override
    public void setMaxTotal(final int max){
        Args.positive(max, "Max value");
        this.lock.lock();
        try{
            this.maxTotal = max;
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public int getMaxTotal(){
        this.lock.lock();
        try{
            return this.maxTotal;
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public void setDefaultMaxPerRoute(final int max){
        Args.positive(max, "Max per route value");
        this.lock.lock();
        try{
            this.defaultMaxPerRoute = max;
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public int getDefaultMaxPerRoute(){
        this.lock.lock();
        try{
            return this.defaultMaxPerRoute;
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public void setMaxPerRoute(final T route,final int max){
        Args.notNull(route, "Route");
        this.lock.lock();
        try{
            if (max > -1){
                this.maxPerRoute.put(route, max);
            }else{
                this.maxPerRoute.remove(route);
            }
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public int getMaxPerRoute(final T route){
        Args.notNull(route, "Route");
        this.lock.lock();
        try{
            return getMax(route);
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public PoolStats getTotalStats(){
        this.lock.lock();
        try{
            return new PoolStats(this.leased.size(), this.pending.size(), this.available.size(), this.maxTotal);
        }finally{
            this.lock.unlock();
        }
    }

    @Override
    public PoolStats getStats(final T route){
        Args.notNull(route, "Route");
        this.lock.lock();
        try{
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            return new PoolStats(pool.getLeasedCount(), pool.getPendingCount(), pool.getAvailableCount(), getMax(route));
        }finally{
            this.lock.unlock();
        }
    }

    /**
     * Returns snapshot of all knows routes
     * 
     * @return the set of routes
     *
     * @since 4.4
     */
    public Set<T> getRoutes(){
        this.lock.lock();
        try{
            return new HashSet<>(routeToPool.keySet());
        }finally{
            this.lock.unlock();
        }
    }

    /**
     * Enumerates all available connections.
     *
     * @since 4.3
     */
    protected void enumAvailable(final PoolEntryCallback<T, C> callback){
        this.lock.lock();
        try{
            final Iterator<E> it = this.available.iterator();
            while (it.hasNext()){
                final E entry = it.next();
                callback.process(entry);
                if (entry.isClosed()){
                    final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                    pool.remove(entry);
                    it.remove();
                }
            }
            purgePoolMap();
        }finally{
            this.lock.unlock();
        }
    }

    /**
     * Enumerates all leased connections.
     *
     * @since 4.3
     */
    protected void enumLeased(final PoolEntryCallback<T, C> callback){
        this.lock.lock();
        try{
            final Iterator<E> it = this.leased.iterator();
            while (it.hasNext()){
                final E entry = it.next();
                callback.process(entry);
            }
        }finally{
            this.lock.unlock();
        }
    }

    private void purgePoolMap(){
        final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
        while (it.hasNext()){
            final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
            final RouteSpecificPool<T, C, E> pool = entry.getValue();
            if (pool.getPendingCount() + pool.getAllocatedCount() == 0){
                it.remove();
            }
        }
    }

    /**
     * Closes connections that have been idle longer than the given period
     * of time and evicts them from the pool.
     *
     * @param idletime
     *            maximum idle time.
     * @param timeUnit
     *            time unit.
     */
    public void closeIdle(final long idletime,final TimeUnit timeUnit){
        Args.notNull(timeUnit, "Time unit");
        long time = timeUnit.toMillis(idletime);
        if (time < 0){
            time = 0;
        }
        final long deadline = System.currentTimeMillis() - time;
        enumAvailable(entry -> {
            if (entry.getUpdated() <= deadline){
                entry.close();
            }
        });
    }

    /**
     * Closes expired connections and evicts them from the pool.
     */
    public void closeExpired(){
        final long now = System.currentTimeMillis();
        enumAvailable(entry -> {
            if (entry.isExpired(now)){
                entry.close();
            }
        });
    }

    /**
     * @return the number of milliseconds
     * @since 4.4
     */
    public int getValidateAfterInactivity(){
        return this.validateAfterInactivity;
    }

    /**
     * @param ms
     *            the number of milliseconds
     * @since 4.4
     */
    public void setValidateAfterInactivity(final int ms){
        this.validateAfterInactivity = ms;
    }

    @Override
    public String toString(){
        this.lock.lock();
        try{
            final StringBuilder buffer = new StringBuilder();
            buffer.append("[leased: ");
            buffer.append(this.leased);
            buffer.append("][available: ");
            buffer.append(this.available);
            buffer.append("][pending: ");
            buffer.append(this.pending);
            buffer.append("]");
            return buffer.toString();
        }finally{
            this.lock.unlock();
        }
    }

}
